package com.alibaba.sreworks.pmdb.services.oem;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.sreworks.pmdb.api.metric.MetricService;
import com.alibaba.sreworks.pmdb.common.exception.MetricExistException;
import com.alibaba.sreworks.pmdb.common.exception.ParamException;
import com.alibaba.sreworks.pmdb.domain.req.metric.MetricCreateReq;
import com.alibaba.sreworks.pmdb.operator.AppOperator;
import com.alibaba.sreworks.pmdb.operator.DwOperator;
import com.alibaba.sreworks.pmdb.operator.JobMasterOperator;
import com.hubspot.jinjava.Jinjava;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 内置应用健康数据Service
 *
 * @author: fangzong.lyj@alibaba-inc.com
 * @date: 2021/10/19 15:55
 */
@Slf4j
@Service
public class OemService {

    private String APP_CPU_EFFICIENCY_METRIC_NAME = "app_cpu_efficiency";
    private String APP_RAM_EFFICIENCY_METRIC_NAME = "app_ram_efficiency";
    private String APP_PVC_ALLOCATION_METRIC_NAME = "app_pvc_allocation";
    private String APP_RESP_TIME_AVG_METRIC_NAME = "app_resp_time_avg";
    private String APP_SUCCESS_RATE_METRIC_NAME = "app_success_rate";
    private String APP_UNREADY_POD_CNT_METRIC_NAME = "app_unready_pod_cnt";

    private String APP_METRICS = "[{\"name\":\"app_cpu_efficiency\",\"alias\":\"应用CPU水位\",\"type\":\"性能指标\",\"creator\":\"sreworks\",\"last_modifier\":\"sreworks\",\"description\":\"应用CPU水位\"},{\"name\":\"app_ram_efficiency\",\"alias\":\"应用RAM水位\",\"type\":\"性能指标\",\"creator\":\"sreworks\",\"last_modifier\":\"sreworks\",\"description\":\"应用RAM水位\"},{\"name\":\"app_pvc_allocation\",\"alias\":\"应用PVC分配量\",\"type\":\"性能指标\",\"creator\":\"sreworks\",\"last_modifier\":\"sreworks\",\"description\":\"应用PVC分配量GB\"},{\"name\":\"app_unready_pod_cnt\",\"alias\":\"应用unready状态POD数量\",\"type\":\"状态指标\",\"creator\":\"sreworks\",\"last_modifier\":\"sreworks\",\"description\":\"应用unready状态POD数量\"},{\"name\":\"app_resp_time_avg\",\"alias\":\"应用平均响应时间\",\"type\":\"性能指标\",\"creator\":\"sreworks\",\"last_modifier\":\"sreworks\",\"description\":\"应用平均响应时间\"},{\"name\":\"app_success_rate\",\"alias\":\"应用请求成功率\",\"type\":\"性能指标\",\"creator\":\"sreworks\",\"last_modifier\":\"sreworks\",\"description\":\"应用请求成功率SLA\"}]";

    private String DW_METRIC_DATA_MODEL_NAME = "METRIC_DATA";

    private String APP_BASIC_METRIC_JOB_NAME_FORMAT = "oem_%s_basic_metric_collect";
    private String APP_BASIC_METRIC_JOB_CONFIG_TEMPLATE = "{\"id\":94,\"gmtCreate\":1646306448721,\"gmtModified\":1646381186593,\"creator\":\"\",\"operator\":\"999999999\",\"appId\":\"\",\"name\":\"oem_{{appName}}_basic_metric_collect\",\"alias\":\"{{appName}}基础指标采集作业(内置)\",\"tags\":[\"{{appName}}\",\"basic_performance_metric\"],\"description\":null,\"options\":null,\"triggerType\":\"cron\",\"triggerConf\":{\"cronExpression\":\"20 * * * * ? *\",\"enabled\":true},\"scheduleType\":\"parallel\",\"scheduleConf\":{\"taskIdList\":[{\"id\":105,\"gmtCreate\":1646306448699,\"gmtModified\":1646380398830,\"creator\":\"\",\"operator\":\"999999999\",\"appId\":\"\",\"name\":\"oem_{{appName}}_ram_efficiency\",\"alias\":\"{{appName}}应用RAM水位(内置)\",\"execTimeout\":60,\"execType\":\"python\",\"execContent\":\"# coding: utf-8\\n\\nimport time\\nimport json\\nimport requests\\n\\nheaders = {}\\nhost = {\\n    \\\"dataset\\\": \\\"http://prod-dataops-dataset.sreworks-dataops.svc.cluster.local:80\\\",\\n    \\\"app\\\": \\\"http://prod-app-app.sreworks.svc.cluster.local:80\\\"\\n}\\n\\npage_size = 1000\\none_millisecond = 1000\\none_minute_millisecond = 60000\\none_hour_millisecond = 3600000\\n\\ndemo_app_id = \\\"{{appId}}\\\"\\n\\n\\ndef get_app_resource_limit(app_id):\\n    endpoint = host[\\\"app\\\"] + \\\"/appdev/app/listAll\\\"\\n    r = requests.get(endpoint, headers=headers)\\n    datas = r.json().get(\\\"data\\\", None)\\n\\n    limit = {}\\n    if datas:\\n        for data in datas:\\n            if data[\\\"appId\\\"] == app_id:\\n                limit = data.get(\\\"detailDict\\\", {}).get(\\\"resource\\\", {}).get(\\\"limits\\\", {})\\n    if limit:\\n        memory_unit = limit[\\\"memory\\\"][-1].upper()\\n        memory_number = int(limit[\\\"memory\\\"][0:-1])\\n        if memory_unit == \\\"T\\\":\\n            limit[\\\"memory\\\"] = memory_number * 1024\\n        elif memory_unit == \\\"M\\\":\\n            limit[\\\"memory\\\"] = memory_number / 1024.0\\n        else:\\n            limit[\\\"memory\\\"] = memory_number\\n\\n    return limit\\n\\n\\ndef _do_get_pod_resource_allocated(endpoint, start_timestamp, end_timestamp, app_id):\\n    basic_url = f'''{endpoint}?sTimestamp={start_timestamp}&eTimestamp={end_timestamp}&appId={app_id}&pageSize={page_size}'''\\n    page_num = 1\\n    datas = []\\n    while True:\\n        url = basic_url + \\\"&pageNum=\\\" + str(page_num)\\n        r = requests.get(url, headers=headers)\\n        if r.status_code != 200:\\n            break\\n\\n        ret = r.json().get(\\\"data\\\", None)\\n        if ret and ret.get(\\\"datas\\\"):\\n            datas.extend(ret.get(\\\"datas\\\"))\\n            _total_num = int(ret.get(\\\"totalNum\\\"))\\n            _page_size = int(ret.get(\\\"pageSize\\\"))\\n            _page_num = int(ret.get(\\\"pageNum\\\"))\\n            if _page_size > _total_num:\\n                break\\n            else:\\n                page_num = _page_num + 1\\n        else:\\n            break\\n\\n    return datas\\n\\n\\ndef get_pod_resource_allocated(app_id):\\n    resource_limit = get_app_resource_limit(app_id)\\n    memory_limit = resource_limit.get(\\\"memory\\\", 0)\\n    cpu_limit = resource_limit.get(\\\"cpu\\\", 0)\\n\\n    now_millisecond = int(time.time()) * one_millisecond\\n    current_minute_timestamp = now_millisecond - now_millisecond % one_minute_millisecond\\n    end_timestamp = current_minute_timestamp - one_minute_millisecond\\n    start_timestamp = end_timestamp - one_minute_millisecond\\n\\n    endpoint_cpu_core = host[\\\"dataset\\\"] + \\\"/interface/pod_resource_hours_allocation\\\"\\n    pod_resource_allocation_list = _do_get_pod_resource_allocated(endpoint_cpu_core, start_timestamp, end_timestamp, app_id)\\n\\n    result = {}\\n    for pod_resource_allocation in pod_resource_allocation_list:\\n        app_instance_id = pod_resource_allocation[\\\"appInstanceId\\\"]\\n        key = app_instance_id\\n        pod_ram_gb_hours_allocation = 0 if pod_resource_allocation[\\\"podRamGbHoursAllocation\\\"] is None else pod_resource_allocation[\\\"podRamGbHoursAllocation\\\"]\\n        pod_cpu_core_hours_allocation = 0 if pod_resource_allocation[\\\"podCpuCoreHoursAllocation\\\"] is None else pod_resource_allocation[\\\"podCpuCoreHoursAllocation\\\"]\\n        pod_pvc_gb_hours_allocation = 0 if pod_resource_allocation[\\\"podPVCGbHoursAllocation\\\"] is None else pod_resource_allocation[\\\"podPVCGbHoursAllocation\\\"]\\n        pod_cpu_core_hours_usage_avg = 0 if pod_resource_allocation[\\\"podCpuCoreHoursUsageAvg\\\"] is None else pod_resource_allocation[\\\"podCpuCoreHoursUsageAvg\\\"]\\n        pod_ram_gb_hours_usage_avg = 0 if pod_resource_allocation[\\\"podRamGbHoursUsageAvg\\\"] is None else pod_resource_allocation[\\\"podRamGbHoursUsageAvg\\\"]\\n        if key in result:\\n            result[key][\\\"podRamGbAllocation\\\"] += pod_ram_gb_hours_allocation\\n            result[key][\\\"podCpuCoreAllocation\\\"] += pod_cpu_core_hours_allocation\\n            result[key][\\\"podPVCGbAllocation\\\"] += pod_pvc_gb_hours_allocation\\n            result[key][\\\"podCpuCoreUsageAvg\\\"] += pod_cpu_core_hours_usage_avg\\n            result[key][\\\"podRamGbUsageAvg\\\"] += pod_ram_gb_hours_usage_avg\\n            result[key][\\\"ramEfficiency\\\"] = 1.0 if memory_limit == 0 else result[key][\\\"podRamGbUsageAvg\\\"] / float(memory_limit)\\n            result[key][\\\"cpuEfficiency\\\"] = 1.0 if cpu_limit == 0 else result[key][\\\"podCpuCoreUsageAvg\\\"] / float(cpu_limit)\\n        else:\\n            result[key] = {\\n                \\\"appInstanceId\\\": pod_resource_allocation[\\\"appInstanceId\\\"],\\n                \\\"appId\\\": pod_resource_allocation[\\\"appId\\\"],\\n                \\\"appInstanceName\\\": pod_resource_allocation[\\\"appInstanceName\\\"],\\n                \\\"podRamGbAllocation\\\": pod_ram_gb_hours_allocation,\\n                \\\"podCpuCoreAllocation\\\": pod_cpu_core_hours_allocation,\\n                \\\"podPVCGbAllocation\\\": pod_pvc_gb_hours_allocation,\\n                \\\"podCpuCoreUsageAvg\\\": pod_cpu_core_hours_usage_avg,\\n                \\\"podRamGbUsageAvg\\\": pod_ram_gb_hours_usage_avg,\\n                \\\"ramEfficiency\\\": 1.0 if memory_limit == 0 else pod_ram_gb_hours_usage_avg / float(memory_limit),\\n                \\\"cpuEfficiency\\\": 1.0 if cpu_limit == 0 else pod_cpu_core_hours_usage_avg / float(cpu_limit),\\n                \\\"id\\\": key + \\\"_\\\" + str(start_timestamp),\\n                \\\"timestamp\\\": start_timestamp\\n            }\\n\\n    metric_datas = []\\n    for app_instance_id, item in result.items():\\n        metric_data = {\\n            \\\"id\\\": item[\\\"id\\\"],\\n            \\\"timestamp\\\": item[\\\"timestamp\\\"],\\n            # \\\"value\\\": item[\\\"cpuEfficiency\\\"],\\n            \\\"value\\\": item[\\\"ramEfficiency\\\"],\\n            \\\"labels\\\": {\\n                \\\"app_instance_id\\\": app_instance_id,\\n                \\\"app_instance_name\\\": item[\\\"appInstanceName\\\"]\\n            }\\n        }\\n        metric_datas.append(metric_data)\\n    return metric_datas\\n\\n\\nprint(json.dumps(get_pod_resource_allocated(demo_app_id)))\\n\",\"execRetryTimes\":0,\"execRetryInterval\":0,\"varConf\":{},\"sceneType\":\"collect\",\"sceneConf\":{\"isPushQueue\":\"false\",\"syncDw\":\"true\",\"id\":{{metricDataModelId}},\"type\":\"model\",\"relatedMetricId\":{{appRamEfficiencyMetricId}},\"layer\":\"{{metricDataModelLayer}}\"}},{\"id\":106,\"gmtCreate\":1646306448704,\"gmtModified\":1646380484089,\"creator\":\"\",\"operator\":\"999999999\",\"appId\":\"\",\"name\":\"oem_{{appName}}_cpu_efficiency\",\"alias\":\"{{appName}}应用CPU水位(内置)\",\"execTimeout\":60,\"execType\":\"python\",\"execContent\":\"# coding: utf-8\\n\\nimport time\\nimport json\\nimport requests\\n\\nheaders = {}\\nhost = {\\n    \\\"dataset\\\": \\\"http://prod-dataops-dataset.sreworks-dataops.svc.cluster.local:80\\\",\\n    \\\"app\\\": \\\"http://prod-app-app.sreworks.svc.cluster.local:80\\\"\\n}\\n\\npage_size = 1000\\none_millisecond = 1000\\none_minute_millisecond = 60000\\none_hour_millisecond = 3600000\\n\\ndemo_app_id = \\\"{{appId}}\\\"\\n\\n\\ndef get_app_resource_limit(app_id):\\n    endpoint = host[\\\"app\\\"] + \\\"/appdev/app/listAll\\\"\\n    r = requests.get(endpoint, headers=headers)\\n    datas = r.json().get(\\\"data\\\", None)\\n\\n    limit = {}\\n    if datas:\\n        for data in datas:\\n            if data[\\\"appId\\\"] == app_id:\\n                limit = data.get(\\\"detailDict\\\", {}).get(\\\"resource\\\", {}).get(\\\"limits\\\", {})\\n    if limit:\\n        memory_unit = limit[\\\"memory\\\"][-1].upper()\\n        memory_number = int(limit[\\\"memory\\\"][0:-1])\\n        if memory_unit == \\\"T\\\":\\n            limit[\\\"memory\\\"] = memory_number * 1024\\n        elif memory_unit == \\\"M\\\":\\n            limit[\\\"memory\\\"] = memory_number / 1024.0\\n        else:\\n            limit[\\\"memory\\\"] = memory_number\\n\\n    return limit\\n\\n\\ndef _do_get_pod_resource_allocated(endpoint, start_timestamp, end_timestamp, app_id):\\n    basic_url = f'''{endpoint}?sTimestamp={start_timestamp}&eTimestamp={end_timestamp}&appId={app_id}&pageSize={page_size}'''\\n    page_num = 1\\n    datas = []\\n    while True:\\n        url = basic_url + \\\"&pageNum=\\\" + str(page_num)\\n        r = requests.get(url, headers=headers)\\n        if r.status_code != 200:\\n            break\\n\\n        ret = r.json().get(\\\"data\\\", None)\\n        if ret and ret.get(\\\"datas\\\"):\\n            datas.extend(ret.get(\\\"datas\\\"))\\n            _total_num = int(ret.get(\\\"totalNum\\\"))\\n            _page_size = int(ret.get(\\\"pageSize\\\"))\\n            _page_num = int(ret.get(\\\"pageNum\\\"))\\n            if _page_size > _total_num:\\n                break\\n            else:\\n                page_num = _page_num + 1\\n        else:\\n            break\\n\\n    return datas\\n\\n\\ndef get_pod_resource_allocated(app_id):\\n    resource_limit = get_app_resource_limit(app_id)\\n    memory_limit = resource_limit.get(\\\"memory\\\", 0)\\n    cpu_limit = resource_limit.get(\\\"cpu\\\", 0)\\n\\n    now_millisecond = int(time.time()) * one_millisecond\\n    current_minute_timestamp = now_millisecond - now_millisecond % one_minute_millisecond\\n    end_timestamp = current_minute_timestamp - one_minute_millisecond\\n    start_timestamp = end_timestamp - one_minute_millisecond\\n\\n    endpoint_cpu_core = host[\\\"dataset\\\"] + \\\"/interface/pod_resource_hours_allocation\\\"\\n    pod_resource_allocation_list = _do_get_pod_resource_allocated(endpoint_cpu_core, start_timestamp, end_timestamp, app_id)\\n\\n    result = {}\\n    for pod_resource_allocation in pod_resource_allocation_list:\\n        app_instance_id = pod_resource_allocation[\\\"appInstanceId\\\"]\\n        key = app_instance_id\\n        pod_ram_gb_hours_allocation = 0 if pod_resource_allocation[\\\"podRamGbHoursAllocation\\\"] is None else pod_resource_allocation[\\\"podRamGbHoursAllocation\\\"]\\n        pod_cpu_core_hours_allocation = 0 if pod_resource_allocation[\\\"podCpuCoreHoursAllocation\\\"] is None else pod_resource_allocation[\\\"podCpuCoreHoursAllocation\\\"]\\n        pod_pvc_gb_hours_allocation = 0 if pod_resource_allocation[\\\"podPVCGbHoursAllocation\\\"] is None else pod_resource_allocation[\\\"podPVCGbHoursAllocation\\\"]\\n        pod_cpu_core_hours_usage_avg = 0 if pod_resource_allocation[\\\"podCpuCoreHoursUsageAvg\\\"] is None else pod_resource_allocation[\\\"podCpuCoreHoursUsageAvg\\\"]\\n        pod_ram_gb_hours_usage_avg = 0 if pod_resource_allocation[\\\"podRamGbHoursUsageAvg\\\"] is None else pod_resource_allocation[\\\"podRamGbHoursUsageAvg\\\"]\\n        if key in result:\\n            result[key][\\\"podRamGbAllocation\\\"] += pod_ram_gb_hours_allocation\\n            result[key][\\\"podCpuCoreAllocation\\\"] += pod_cpu_core_hours_allocation\\n            result[key][\\\"podPVCGbAllocation\\\"] += pod_pvc_gb_hours_allocation\\n            result[key][\\\"podCpuCoreUsageAvg\\\"] += pod_cpu_core_hours_usage_avg\\n            result[key][\\\"podRamGbUsageAvg\\\"] += pod_ram_gb_hours_usage_avg\\n            result[key][\\\"ramEfficiency\\\"] = 1.0 if memory_limit == 0 else result[key][\\\"podRamGbUsageAvg\\\"] / float(memory_limit)\\n            result[key][\\\"cpuEfficiency\\\"] = 1.0 if cpu_limit == 0 else result[key][\\\"podCpuCoreUsageAvg\\\"] / float(cpu_limit)\\n        else:\\n            result[key] = {\\n                \\\"appInstanceId\\\": pod_resource_allocation[\\\"appInstanceId\\\"],\\n                \\\"appId\\\": pod_resource_allocation[\\\"appId\\\"],\\n                \\\"appInstanceName\\\": pod_resource_allocation[\\\"appInstanceName\\\"],\\n                \\\"podRamGbAllocation\\\": pod_ram_gb_hours_allocation,\\n                \\\"podCpuCoreAllocation\\\": pod_cpu_core_hours_allocation,\\n                \\\"podPVCGbAllocation\\\": pod_pvc_gb_hours_allocation,\\n                \\\"podCpuCoreUsageAvg\\\": pod_cpu_core_hours_usage_avg,\\n                \\\"podRamGbUsageAvg\\\": pod_ram_gb_hours_usage_avg,\\n                \\\"ramEfficiency\\\": 1.0 if memory_limit == 0 else pod_ram_gb_hours_usage_avg / float(memory_limit),\\n                \\\"cpuEfficiency\\\": 1.0 if cpu_limit == 0 else pod_cpu_core_hours_usage_avg / float(cpu_limit),\\n                \\\"id\\\": key + \\\"_\\\" + str(start_timestamp),\\n                \\\"timestamp\\\": start_timestamp\\n            }\\n\\n    metric_datas = []\\n    for app_instance_id, item in result.items():\\n        metric_data = {\\n            \\\"id\\\": item[\\\"id\\\"],\\n            \\\"timestamp\\\": item[\\\"timestamp\\\"],\\n            \\\"value\\\": item[\\\"cpuEfficiency\\\"],\\n            # \\\"value\\\": item[\\\"ramEfficiency\\\"],\\n            \\\"labels\\\": {\\n                \\\"app_instance_id\\\": app_instance_id,\\n                \\\"app_instance_name\\\": item[\\\"appInstanceName\\\"]\\n            }\\n        }\\n        metric_datas.append(metric_data)\\n    return metric_datas\\n\\n\\nprint(json.dumps(get_pod_resource_allocated(demo_app_id)))\\n\",\"execRetryTimes\":0,\"execRetryInterval\":0,\"varConf\":{},\"sceneType\":\"collect\",\"sceneConf\":{\"isPushQueue\":\"false\",\"syncDw\":\"true\",\"id\":{{metricDataModelId}},\"type\":\"model\",\"relatedMetricId\":{{appCpuEfficiencyMetricId}},\"layer\":\"{{metricDataModelLayer}}\"}},{\"id\":107,\"gmtCreate\":1646306448712,\"gmtModified\":1646306448712,\"creator\":\"\",\"operator\":\"\",\"appId\":\"\",\"name\":\"oem_{{appName}}_response_time_avg\",\"alias\":\"{{appName}}应用平均响应时间(内置)\",\"execTimeout\":60,\"execType\":\"python\",\"execContent\":\"# coding: utf-8\\n\\nimport datetime\\nimport json\\nimport time\\nimport requests\\nfrom gql import gql, Client\\nfrom gql.transport.aiohttp import AIOHTTPTransport\\n\\none_second_millisecond = 1000\\none_minutes_millisecond = 60000\\nfive_minutes_millisecond = 300000\\n\\nsw_duration_time_format = \\\"%Y-%m-%d %H%M\\\"\\nsw_duration_time_gap = one_minutes_millisecond\\n\\ndemo_app_id = \\\"{{appId}}\\\"\\n\\nhost = {\\n    \\\"app\\\": \\\"http://prod-app-app.sreworks.svc.cluster.local:80\\\",\\n    \\\"skywalking\\\": \\\"http://prod-dataops-skywalking-oap.sreworks-dataops.svc.cluster.local:12800/graphql\\\"\\n}\\nheaders = {}\\n\\n\\ndef convert_utc_to_local_dt(str_utc_time):\\n    # return datetime.datetime.strptime(str_utc_time, \\\"%Y-%m-%dT%H:%M:%S.%fZ\\\").replace(tzinfo=datetime.timezone.utc).astimezone(tz.tzlocal())\\n    return datetime.datetime.strptime(str_utc_time, \\\"%Y-%m-%dT%H:%M:%S.%fZ\\\").replace(tzinfo=datetime.timezone.utc) \\\\\\n        .astimezone(datetime.timezone(datetime.timedelta(hours=8)))\\n\\n\\ndef get_time_range(ts, delta_ts, forward_gap=0):\\n    \\\"\\\"\\\"\\n    时间范围\\n    :param ts:\\n    :param delta_ts:\\n    :param forward_gap: 默认前推一个delta\\n    :return:\\n    \\\"\\\"\\\"\\n    ts_integer = int(ts)\\n    delta_ts_integer = int(delta_ts)\\n\\n    end_ts = ts_integer - ts_integer % delta_ts_integer\\n    start_ts = end_ts - delta_ts_integer\\n\\n    delta_forward_ts_integer = delta_ts_integer * forward_gap\\n\\n    return start_ts - delta_forward_ts_integer, end_ts - delta_forward_ts_integer\\n\\n\\ndef build_graphql():\\n    query = gql(\\n        \\\"\\\"\\\"\\n        query queryData($condition: MetricsCondition!, $duration: Duration!) {\\n                readMetricsValues: readMetricsValues(condition: $condition, duration: $duration) {\\n                  label\\n                  values {\\n                    values {\\n                      value\\n                    }\\n                  }\\n                }\\n              }\\n    \\\"\\\"\\\"\\n    )\\n\\n    return query\\n\\n\\ndef get_app_instances():\\n    endpoint = host[\\\"app\\\"] + \\\"/appcenter/appInstance/allAppInstances?page=1&pageSize=1000000\\\"\\n    r = requests.get(endpoint, headers=headers)\\n    datas = r.json().get(\\\"data\\\", None)\\n    app_instances = []\\n    if datas:\\n        for data in datas[\\\"items\\\"]:\\n            if data[\\\"appId\\\"] == demo_app_id:\\n                app_instances.append(data)\\n    return app_instances\\n\\n\\ndef get_app_avg_rt(app_instance, start_ts, end_ts):\\n    start_dt = datetime.datetime.fromtimestamp(start_ts / one_second_millisecond).strftime(sw_duration_time_format)\\n    end_dt = datetime.datetime.fromtimestamp(end_ts / one_second_millisecond).strftime(sw_duration_time_format)\\n\\n    transport = AIOHTTPTransport(url=host[\\\"skywalking\\\"])\\n    client = Client(transport=transport, fetch_schema_from_transport=True)\\n\\n    duration = {\\n        \\\"start\\\": start_dt,\\n        \\\"end\\\": end_dt,\\n        \\\"step\\\": \\\"MINUTE\\\"\\n    }\\n    condition = {\\n        \\\"name\\\": \\\"service_resp_time\\\",\\n        \\\"entity\\\": {\\n            \\\"scope\\\": \\\"Service\\\",\\n            \\\"serviceName\\\": app_instance[\\\"appInstanceId\\\"],\\n            \\\"normal\\\": True\\n        }\\n    }\\n    params = {\\\"duration\\\": duration, \\\"condition\\\": condition}\\n    resp = client.execute(build_graphql(), variable_values=params)\\n    results = resp.get(\\\"readMetricsValues\\\", {}).get(\\\"values\\\", {}).get(\\\"values\\\", [])\\n    datas = []\\n    for i in range(len(results) - 1):\\n        data = {\\n            \\\"value\\\": results[i].get(\\\"value\\\", 0),\\n            \\\"timestamp\\\": start_ts + i * sw_duration_time_gap,\\n            \\\"labels\\\": {\\n                \\\"app_instance_id\\\": app_instance[\\\"appInstanceId\\\"],\\n                \\\"app_instance_name\\\": app_instance[\\\"appInstanceName\\\"]\\n            }\\n        }\\n        datas.append(data)\\n    return datas\\n\\n\\ndef collect():\\n    app_instances = get_app_instances()\\n    start_ts, end_ts = get_time_range(time.time() * one_second_millisecond, one_minutes_millisecond)\\n\\n    metric_datas = []\\n    for app_instance in app_instances:\\n        datas = get_app_avg_rt(app_instance, start_ts, end_ts)\\n        metric_datas.extend(datas)\\n    return metric_datas\\n\\n\\nprint(json.dumps(collect()))\\n\",\"execRetryTimes\":0,\"execRetryInterval\":0,\"varConf\":{},\"sceneType\":\"collect\",\"sceneConf\":{\"isPushQueue\":\"true\",\"syncDw\":\"true\",\"id\":{{metricDataModelId}},\"type\":\"model\",\"relatedMetricId\":{{appRespTimeAvgMetricId}},\"layer\":\"{{metricDataModelLayer}}\"}},{\"id\":108,\"gmtCreate\":1646306448716,\"gmtModified\":1646306448716,\"creator\":\"\",\"operator\":\"\",\"appId\":\"\",\"name\":\"oem_{{appName}}_success_rate\",\"alias\":\"{{appName}}应用每分钟调用成功率(内置)\",\"execTimeout\":60,\"execType\":\"python\",\"execContent\":\"# coding: utf-8\\n\\nimport datetime\\nimport json\\nimport time\\nimport requests\\nfrom gql import gql, Client\\nfrom gql.transport.aiohttp import AIOHTTPTransport\\n\\none_second_millisecond = 1000\\none_minutes_millisecond = 60000\\nfive_minutes_millisecond = 300000\\n\\nsw_duration_time_format = \\\"%Y-%m-%d %H%M\\\"\\nsw_duration_time_gap = one_minutes_millisecond\\n\\ndemo_app_id = \\\"{{appId}}\\\"\\n\\nhost = {\\n    \\\"app\\\": \\\"http://prod-app-app.sreworks.svc.cluster.local:80\\\",\\n    \\\"skywalking\\\": \\\"http://prod-dataops-skywalking-oap.sreworks-dataops.svc.cluster.local:12800/graphql\\\"\\n}\\nheaders = {}\\n\\ndef convert_utc_to_local_dt(str_utc_time):\\n    # return datetime.datetime.strptime(str_utc_time, \\\"%Y-%m-%dT%H:%M:%S.%fZ\\\").replace(tzinfo=datetime.timezone.utc).astimezone(tz.tzlocal())\\n    return datetime.datetime.strptime(str_utc_time, \\\"%Y-%m-%dT%H:%M:%S.%fZ\\\").replace(tzinfo=datetime.timezone.utc) \\\\\\n        .astimezone(datetime.timezone(datetime.timedelta(hours=8)))\\n\\n\\ndef get_time_range(ts, delta_ts, forward_gap=0):\\n    \\\"\\\"\\\"\\n    时间范围\\n    :param ts:\\n    :param delta_ts:\\n    :param forward_gap: 默认前推一个delta\\n    :return:\\n    \\\"\\\"\\\"\\n    ts_integer = int(ts)\\n    delta_ts_integer = int(delta_ts)\\n\\n    end_ts = ts_integer - ts_integer % delta_ts_integer\\n    start_ts = end_ts - delta_ts_integer\\n\\n    delta_forward_ts_integer = delta_ts_integer * forward_gap\\n\\n    return start_ts - delta_forward_ts_integer, end_ts - delta_forward_ts_integer\\n\\n\\ndef build_graphql():\\n    query = gql(\\n        \\\"\\\"\\\"\\n        query queryData($condition: MetricsCondition!, $duration: Duration!) {\\n            readMetricsValue: readMetricsValue(condition: $condition, duration: $duration)\\n        }\\n    \\\"\\\"\\\"\\n    )\\n\\n    return query\\n\\n\\ndef get_app_instances():\\n    endpoint = host[\\\"app\\\"] + \\\"/appcenter/appInstance/allAppInstances?page=1&pageSize=1000000\\\"\\n    r = requests.get(endpoint, headers=headers)\\n    datas = r.json().get(\\\"data\\\", None)\\n    app_instances = []\\n    if datas:\\n        for data in datas[\\\"items\\\"]:\\n            if data[\\\"appId\\\"] == demo_app_id:\\n                app_instances.append(data)\\n    return app_instances\\n\\n\\ndef get_app_sla(app_instance, start_ts, end_ts):\\n    start_dt = datetime.datetime.fromtimestamp(start_ts / one_second_millisecond).strftime(sw_duration_time_format)\\n    end_dt = datetime.datetime.fromtimestamp(end_ts / one_second_millisecond).strftime(sw_duration_time_format)\\n\\n    transport = AIOHTTPTransport(url=host[\\\"skywalking\\\"])\\n    client = Client(transport=transport, fetch_schema_from_transport=True)\\n\\n    duration = {\\n        \\\"start\\\": start_dt,\\n        \\\"end\\\": end_dt,\\n        \\\"step\\\": \\\"MINUTE\\\"\\n    }\\n    condition = {\\n        \\\"name\\\": \\\"service_sla\\\",\\n        \\\"entity\\\": {\\n            \\\"scope\\\": \\\"Service\\\",\\n            \\\"serviceName\\\": app_instance[\\\"appInstanceId\\\"],\\n            \\\"normal\\\": True\\n        }\\n    }\\n    params = {\\\"duration\\\": duration, \\\"condition\\\": condition}\\n    resp = client.execute(build_graphql(), variable_values=params)\\n    sla = resp.get(\\\"readMetricsValue\\\", 10000)\\n    data = {\\n        \\\"value\\\": sla,\\n        \\\"timestamp\\\": end_ts,\\n        \\\"labels\\\": {\\n            \\\"app_instance_id\\\": app_instance[\\\"appInstanceId\\\"],\\n            \\\"app_instance_name\\\": app_instance[\\\"appInstanceName\\\"]\\n        }\\n    }\\n    return data\\n\\n\\ndef collect():\\n    app_instances = get_app_instances()\\n    start_ts, end_ts = get_time_range(time.time() * one_second_millisecond, one_minutes_millisecond)\\n\\n    metric_datas = []\\n    for app_instance in app_instances:\\n        datas = get_app_sla(app_instance, start_ts, end_ts)\\n        metric_datas.append(datas)\\n    return metric_datas\\n\\n\\nprint(json.dumps(collect()))\",\"execRetryTimes\":0,\"execRetryInterval\":0,\"varConf\":{},\"sceneType\":\"collect\",\"sceneConf\":{\"isPushQueue\":\"true\",\"syncDw\":\"true\",\"id\":{{metricDataModelId}},\"type\":\"model\",\"relatedMetricId\":{{appSuccessRateMetricId}},\"layer\":\"{{metricDataModelLayer}}\"}},{\"id\":112,\"gmtCreate\":1646381167983,\"gmtModified\":1646381167983,\"creator\":\"999999999\",\"operator\":\"999999999\",\"appId\":\"\",\"name\":\"oem_{{appName}}_pvc_allocated\",\"alias\":\"{{appName}}应用PVC分配(内置)\",\"execTimeout\":60,\"execType\":\"python\",\"execContent\":\"# coding: utf-8\\n\\nimport time\\nimport json\\nimport requests\\n\\nheaders = {}\\nhost = {\\n    \\\"dataset\\\": \\\"http://prod-dataops-dataset.sreworks-dataops.svc.cluster.local:80\\\"\\n}\\n\\npage_size = 1000\\none_millisecond = 1000\\none_minute_millisecond = 60000\\none_hour_millisecond = 3600000\\n\\ndemo_app_id = \\\"{{appId}}\\\"\\n\\n\\ndef _do_get_pod_resource_allocated(endpoint, start_timestamp, end_timestamp, app_id):\\n    basic_url = f'''{endpoint}?sTimestamp={start_timestamp}&eTimestamp={end_timestamp}&appId={app_id}&pageSize={page_size}'''\\n    page_num = 1\\n    datas = []\\n    while True:\\n        url = basic_url + \\\"&pageNum=\\\" + str(page_num)\\n        r = requests.get(url, headers=headers)\\n        if r.status_code == 200:\\n            ret = r.json().get(\\\"data\\\", None)\\n            if ret:\\n                datas.extend(ret.get(\\\"datas\\\"))\\n                _total_num = int(ret.get(\\\"totalNum\\\"))\\n                _page_size = int(ret.get(\\\"pageSize\\\"))\\n                _page_num = int(ret.get(\\\"pageNum\\\"))\\n                if _page_size > _total_num:\\n                    break\\n                else:\\n                    page_num = _page_num + 1\\n            else:\\n                break\\n\\n    return datas\\n\\n\\ndef get_pod_resource_allocated(app_id):\\n    now_millisecond = int(time.time()) * one_millisecond\\n    current_minute_timestamp = now_millisecond - now_millisecond % one_minute_millisecond\\n    end_timestamp = current_minute_timestamp - one_minute_millisecond\\n    start_timestamp = end_timestamp - one_minute_millisecond\\n\\n    endpoint_cpu_core = host[\\\"dataset\\\"] + \\\"/interface/pod_resource_hours_allocation\\\"\\n    pod_resource_allocation_list = _do_get_pod_resource_allocated(endpoint_cpu_core, start_timestamp, end_timestamp, app_id)\\n\\n    result = {}\\n    for pod_resource_allocation in pod_resource_allocation_list:\\n        app_instance_id = pod_resource_allocation[\\\"appInstanceId\\\"]\\n        key = app_instance_id\\n        pod_pvc_gb_hours_allocation = 0 if pod_resource_allocation[\\\"podPVCGbHoursAllocation\\\"] is None else pod_resource_allocation[\\\"podPVCGbHoursAllocation\\\"]\\n        if key in result:\\n            result[key][\\\"podPVCGbAllocation\\\"] += pod_pvc_gb_hours_allocation\\n        else:\\n            result[key] = {\\n                \\\"appInstanceId\\\": pod_resource_allocation[\\\"appInstanceId\\\"],\\n                \\\"appId\\\": pod_resource_allocation[\\\"appId\\\"],\\n                \\\"appInstanceName\\\": pod_resource_allocation[\\\"appInstanceName\\\"],\\n                \\\"podPVCGbAllocation\\\": pod_pvc_gb_hours_allocation,\\n                \\\"id\\\": key + \\\"_\\\" + str(start_timestamp),\\n                \\\"timestamp\\\": start_timestamp\\n            }\\n\\n    metric_datas = []\\n    for app_instance_id, item in result.items():\\n        metric_data = {\\n            \\\"id\\\": item[\\\"id\\\"],\\n            \\\"timestamp\\\": item[\\\"timestamp\\\"],\\n            \\\"value\\\": item[\\\"podPVCGbAllocation\\\"],\\n            \\\"labels\\\": {\\n                \\\"app_instance_id\\\": app_instance_id,\\n                \\\"app_instance_name\\\": item[\\\"appInstanceName\\\"]\\n            }\\n        }\\n        metric_datas.append(metric_data)\\n    return metric_datas\\n\\n\\nprint(json.dumps(get_pod_resource_allocated(demo_app_id)))\\n\",\"execRetryTimes\":0,\"execRetryInterval\":0,\"varConf\":{},\"sceneType\":\"collect\",\"sceneConf\":{\"isPushQueue\":\"false\",\"syncDw\":\"true\",\"id\":{{metricDataModelId}},\"type\":\"model\",\"relatedMetricId\":{{appPVCAllocatedMetricId}},\"layer\":\"{{metricDataModelLayer}}\"}},{\"id\":187,\"gmtCreate\":1646645189211,\"gmtModified\":1646645189211,\"creator\":\"999999999\",\"operator\":\"999999999\",\"appId\":\"\",\"name\":\"oem_{{appName}}_unready_pod_cnt\",\"alias\":\"{{appName}}应用unready状态POD数量(内置)\",\"execTimeout\":60,\"execType\":\"python\",\"execContent\":\"# coding: utf-8\\n\\nimport time\\nimport json\\nimport requests\\n\\nheaders = {}\\nhost = {\\n    \\\"dataset\\\": \\\"http://prod-dataops-dataset.sreworks-dataops.svc.cluster.local:80\\\"\\n}\\n\\npage_size = 1000\\none_millisecond = 1000\\none_minute_millisecond = 60000\\none_hour_millisecond = 3600000\\n\\ndemo_app_id = \\\"{{appId}}\\\"\\n\\n\\ndef _do_get_app_pod_status(endpoint, start_timestamp, end_timestamp, app_id):\\n    basic_url = f'''{endpoint}?sTimestamp={start_timestamp}&eTimestamp={end_timestamp}&appId={app_id}&pageSize={page_size}'''\\n    page_num = 1\\n    datas = []\\n    while True:\\n        url = basic_url + \\\"&pageNum=\\\" + str(page_num)\\n        r = requests.get(url, headers=headers)\\n        if r.status_code == 200:\\n            ret = r.json().get(\\\"data\\\", None)\\n            if ret is not None:\\n                datas.extend(ret.get(\\\"datas\\\"))\\n                _total_num = int(ret.get(\\\"totalNum\\\"))\\n                _page_size = int(ret.get(\\\"pageSize\\\"))\\n                _page_num = int(ret.get(\\\"pageNum\\\"))\\n                if _page_size > _total_num:\\n                    break\\n                else:\\n                    page_num = _page_num + 1\\n    return datas\\n\\n\\ndef get_app_unready_pod_cnt(app_id):\\n    now_millisecond = int(time.time()) * one_millisecond\\n    current_minute_timestamp = now_millisecond - now_millisecond % one_minute_millisecond\\n    end_timestamp = current_minute_timestamp - one_minute_millisecond\\n    start_timestamp = end_timestamp - one_minute_millisecond\\n\\n    endpoint_pod_status = host[\\\"dataset\\\"] + \\\"/interface/app_pod_status\\\"\\n    app_pod_status_list = _do_get_app_pod_status(endpoint_pod_status, start_timestamp, end_timestamp, app_id)\\n\\n    result = {}\\n    for app_pod_status in app_pod_status_list:\\n        app_instance_id = app_pod_status[\\\"appInstanceId\\\"]\\n        key = app_instance_id\\n        ready_str = app_pod_status[\\\"podReady\\\"].lower()\\n        if key in result:\\n            if ready_str == \\\"true\\\":\\n                result[key][\\\"readyCnt\\\"] += 1\\n            else:\\n                result[key][\\\"unreadyCnt\\\"] += 1\\n        else:\\n            result[key] = app_pod_status\\n            if ready_str == \\\"true\\\":\\n                result[key][\\\"readyCnt\\\"] = 1\\n                result[key][\\\"unreadyCnt\\\"] = 0\\n            else:\\n                result[key][\\\"readyCnt\\\"] = 0\\n                result[key][\\\"unreadyCnt\\\"] = 1\\n\\n    metric_datas = []\\n    for app_instance_id, item in result.items():\\n        metric_data = {\\n            \\\"id\\\": app_instance_id + \\\"_\\\" + str(start_timestamp),\\n            \\\"timestamp\\\": start_timestamp,\\n            \\\"value\\\": item[\\\"unreadyCnt\\\"],\\n            \\\"labels\\\": {\\n                \\\"app_instance_id\\\": app_instance_id,\\n                \\\"app_instance_name\\\": item[\\\"appInstanceName\\\"]\\n            }\\n        }\\n        metric_datas.append(metric_data)\\n    return metric_datas\\n\\n\\nprint(json.dumps(get_app_unready_pod_cnt(demo_app_id)))\\n\",\"execRetryTimes\":0,\"execRetryInterval\":0,\"varConf\":{},\"sceneType\":\"collect\",\"sceneConf\":{\"isPushQueue\":\"true\",\"syncDw\":\"true\",\"id\":{{metricDataModelId}},\"type\":\"model\",\"relatedMetricId\":{{appUnreadyPodCntMetricIdd}},\"layer\":\"{{metricDataModelLayer}}\"}}]},\"sceneType\":\"collect\",\"sceneConf\":{},\"varConf\":{},\"notifyConf\":null,\"eventConf\":[]}";

    @Autowired
    MetricService metricService;

    @Autowired
    JobMasterOperator jobMasterOperator;

    @Autowired
    DwOperator dwOperator;

    @Autowired
    AppOperator appOperator;

    public void open(String appId) throws Exception {
        JSONObject app = getAppById(appId);
        String appName = app.getString("name");

        JSONObject metrics = addMetric(appId, appName);

        String jobName = String.format(APP_BASIC_METRIC_JOB_NAME_FORMAT, appName);
        JSONObject job = jobMasterOperator.getJobByName(jobName);
        if (CollectionUtils.isEmpty(job)) {
            JSONObject dwModel = dwOperator.getDWModel(DW_METRIC_DATA_MODEL_NAME);
            if (CollectionUtils.isEmpty(dwModel)) {
                throw new ParamException(String.format("指标数据[%s]模型不存在,请先创建数仓模型", DW_METRIC_DATA_MODEL_NAME));
            }

            Jinjava jinjava = new Jinjava();
            Map<String, Object> params = new HashMap<>();
            params.put("appId", appId);
            params.put("appName", appName);
            params.put("metricDataModelId", dwModel.getInteger("id"));
            params.put("metricDataModelLayer", dwModel.getString("layer"));

            Integer cpuMetricId = metrics.getInteger(APP_CPU_EFFICIENCY_METRIC_NAME) == null ? getMetricId(appId, appName, APP_CPU_EFFICIENCY_METRIC_NAME) : metrics.getInteger(APP_CPU_EFFICIENCY_METRIC_NAME);
            params.put("appCpuEfficiencyMetricId", cpuMetricId);

            Integer ramMetricId = metrics.getInteger(APP_RAM_EFFICIENCY_METRIC_NAME) == null ? getMetricId(appId, appName, APP_RAM_EFFICIENCY_METRIC_NAME) : metrics.getInteger(APP_RAM_EFFICIENCY_METRIC_NAME);
            params.put("appRamEfficiencyMetricId", ramMetricId);

            Integer pvcMetricId = metrics.getInteger(APP_PVC_ALLOCATION_METRIC_NAME) == null ? getMetricId(appId, appName, APP_PVC_ALLOCATION_METRIC_NAME) : metrics.getInteger(APP_PVC_ALLOCATION_METRIC_NAME);
            params.put("appPVCAllocatedMetricId", pvcMetricId);

            Integer unreadyPodCntMetricId = metrics.getInteger(APP_UNREADY_POD_CNT_METRIC_NAME) == null ? getMetricId(appId, appName, APP_UNREADY_POD_CNT_METRIC_NAME) : metrics.getInteger(APP_UNREADY_POD_CNT_METRIC_NAME);
            params.put("appUnreadyPodCntMetricIdd", unreadyPodCntMetricId);

            Integer respMetricId = metrics.getInteger(APP_RESP_TIME_AVG_METRIC_NAME) == null ? getMetricId(appId, appName, APP_RESP_TIME_AVG_METRIC_NAME) : metrics.getInteger(APP_RESP_TIME_AVG_METRIC_NAME);
            params.put("appRespTimeAvgMetricId", respMetricId);

            Integer successRateMetricId = metrics.getInteger(APP_SUCCESS_RATE_METRIC_NAME) == null ? getMetricId(appId, appName, APP_SUCCESS_RATE_METRIC_NAME) : metrics.getInteger(APP_SUCCESS_RATE_METRIC_NAME);
            params.put("appSuccessRateMetricId", successRateMetricId);

            String appMetricJobConfig = jinjava.render(APP_BASIC_METRIC_JOB_CONFIG_TEMPLATE, params);
            JSONObject jobConfig = JSONObject.parseObject(appMetricJobConfig);
            jobMasterOperator.createJob(Collections.singletonList(jobConfig));
            job = jobMasterOperator.getJobByName(jobName);
        }
        jobMasterOperator.toggleCronJob(job.getLong("id"), true);
    }

    private Integer getMetricId(String appId, String appName, String metricName) {
        JSONObject labels = new JSONObject();
        labels.put("app_id", appId);
        labels.put("app_name", appName);

        List<JSONObject> metrics = metricService.getMetrics(metricName, labels);
        if (!CollectionUtils.isEmpty(metrics)) {
            return metrics.get(0).getInteger("id");
        }
        return -1;
    }

    private JSONObject addMetric(String appId, String appName) {
        JSONObject labels = new JSONObject();
        labels.put("source", "oem");
        labels.put("app_id", appId);
        labels.put("app_name", appName);

        List<MetricCreateReq> reqs = JSONObject.parseArray(APP_METRICS, MetricCreateReq.class);
        JSONObject metrics = new JSONObject();
        reqs.forEach(req -> {
            req.setLabels(labels);
            try {
                int metricId = metricService.createMetric(req);
                metrics.put(req.getName(), metricId);
            } catch (MetricExistException ex) {
                log.warn(ex.getMessage());
            } catch (Exception ex) {
                throw new RuntimeException(ex);
            }
        });

        return metrics;
    }

    public boolean state(String appId) throws Exception {
        JSONObject app = getAppById(appId);
        String appName = app.getString("name");

        String jobName = String.format(APP_BASIC_METRIC_JOB_NAME_FORMAT, appName);
        JSONObject job = jobMasterOperator.getJobByName(jobName);
        if (!CollectionUtils.isEmpty(job)) {
            return job.getJSONObject("triggerConf").getBoolean("enabled");
        }
        return false;
    }

    public void close(String appId) throws Exception {
        JSONObject app = getAppById(appId);
        String appName = app.getString("name");

        String jobName = String.format(APP_BASIC_METRIC_JOB_NAME_FORMAT, appName);
        JSONObject job = jobMasterOperator.getJobByName(jobName);
        if (!CollectionUtils.isEmpty(job)) {
            // 仅停止作业
            jobMasterOperator.toggleCronJob(job.getLong("id"), false);
        }
    }

    private JSONObject getAppById(String appId)  throws Exception {
        JSONObject app = appOperator.getAppById(appId);
        if (CollectionUtils.isEmpty(app)) {
            throw new ParamException("应用定义查询失败");
        }
        return app;
    }
}
